package pub.ryan.dw.pub.ryan.dw.idmp

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import pub.ryan.commons.util.SparkUtil

// 加入上一日的日志数据
class LogDataidmp2 {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkUtil.getSparkSession()
    //导隐式转换
    import sparkSession.implicits._
    //获取今天三类数据
    val weblog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\web")
    val applog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\app")
    val wxapplog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\wxapp")
    //提取每一类数据中每一行的标识字段

    val appids: RDD[Array[String]] = logds(applog)
    val weblogids: RDD[Array[String]] = logds(weblog)
    val wxapplogids: RDD[Array[String]] = logds(wxapplog)
    //将所有log拼接成一个
    val ids: RDD[Array[String]] = appids.union(weblogids).union(wxapplogids)
    //构造图中的点集合
    val vertices: RDD[(Long, String)] = ids.flatMap(arr => {
      for (ele <- arr) yield (ele.hashCode.toLong, ele)
    })
    //构造图中的边集合 各种组合
    val edges: RDD[Edge[String]] = ids.flatMap(arr => {
      //双层for对数组中所有标识进行两两组合
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      //将每个组合进行类wordcount计数，计算边出现的数次，并过滤边小于2的情况
      .map(edge => (edge, 1))
      .reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(tp => tp._1)

    // 将上一日的idmp映射字典解析成点、边集合
    // 昨天输出的是parquet 加载昨天的字典
    val yestodayIdmp: DataFrame = sparkSession.read.parquet("data\\idmp\\2020-01-11\\")
    val yestodayIdmpVertices: RDD[(VertexId, String)] = yestodayIdmp.rdd.map({
      case Row(idFlag: VertexId, guid: VertexId) => (idFlag, "")
    })
    val yestodayEdges: RDD[Edge[String]] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      Edge(idFlag, gid, "")
    })

    //将当日的点合并上一日的点集合，当日的边合并上一日的边集合,再用所有点集合、边集合来构造图，并利用算法求出最大连通子图
    val graph: Graph[String, String] = Graph(vertices.union(yestodayIdmpVertices), edges.union(yestodayEdges))
    val res_vertex: VertexRDD[VertexId] = graph.connectedComponents().vertices

    // 将今天的图结果与上日的映射字典做对比调整guid
    // 1、将上日的idmp映射结果字典收集到driver端，并广播
    val idMap: collection.Map[VertexId, VertexId] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      (idFlag, gid)
    }).collectAsMap()
    val broadcastValue: Broadcast[collection.Map[VertexId, VertexId]] = sparkSession.sparkContext.broadcast(idMap)

    // 2、将今天的图计算结果按照guid分组
    val todayImplResult: RDD[(VertexId, VertexId)] = res_vertex.map(tp => (tp._2, tp._1))
      .groupByKey()
      // 将广播变量放在map循环外面，一个区取一次
      .mapPartitions(iter => {
        // 从广播变量中取出上日的idmp映射字典
        val idmpMap: collection.Map[VertexId, VertexId] = broadcastValue.value
        iter.map(tp => {
          //当日guid计算结果
          var todayGid: VertexId = tp._1
          // 这一组是的所有id标识
          val todayids: Iterable[VertexId] = tp._2
          //遍历当日id, 挨个去上日的idmp映射字典中查找
          var find = false
          for (elem <- todayids if !find) {
            val maybeGid: Option[VertexId] = idmpMap.get(elem)
            // 如果这个id在昨天的映射陪爸妈中找到了，那么就用昨天的guid替换掉今天这一组的guid
            if (maybeGid.isDefined) {
              // 将昨天的guid替换今天的guid，退出当前循环
              todayGid = maybeGid.get
              find = true
            }
          }
          //返回合并后的今日最新的guid及数据
          (todayGid, todayids)
        })
      }).flatMap(tp => {
      val ids: Iterable[VertexId] = tp._2
      val gid: VertexId = tp._1
      for (elem <- ids) yield (elem, gid)
    })


    //再利用图计算的值作为新的日志gid保存到项目目录下 最后将字典写到今天目录下
    todayImplResult.toDF("gid_hc", "gid").write.parquet("data\\idmp\\2020-01-12\\")
    sparkSession.close()
  }

  //提取标识，因为它们的格式一样，所以不用处理不同，如果日志不同时记得要自己来处理
  private def logds(applog: Dataset[String]): RDD[Array[String]] = {
    applog.rdd.map(line => {
      val jsonObject: JSONObject = JSON.parseObject(line)
      val userObj: JSONObject = jsonObject.getJSONObject("user")
      val uid: String = userObj.getString("uid")
      val phoneObj: JSONObject = userObj.getJSONObject("phone")
      val imei: String = phoneObj.getString("imei")
      val mac: String = phoneObj.getString("mac")
      val imsi: String = phoneObj.getString("imsi")
      val androidId: String = phoneObj.getString("androidId")
      val deviceId: String = phoneObj.getString("deviceId")
      val uuid: String = phoneObj.getString("uuid")

      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    })
  }
}
